Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unnecessary calls to CodecPool.returnCompressor/returnDecompresso... #103

Merged
merged 5 commits into from
Dec 5, 2014

Conversation

themodernlife
Copy link
Contributor

...r to avoid race conditions

The input/output stream implementations erroneously add the (de)compressors back to the CodecPool on close, even though they didn't get the (de)compressors from the pool. The user who creates the (de)compressor is responsibile for doing this and if they both return the decompressor, you will end up with the same instance in the pool twice which leads to a race condition.

This fixes #91 and #94.

There was some concern that this might break some code in the wild.

FWIW, I did a quick search on GitHub to see how people are using this library, and there really wasn't much to speak of outside of forks/hadoop code. The code I did find properly uses CodecPool (getting and returning) so this patch wouldn't be an issue. This patch also should work cleanly with any Hadoop setups.

The only way I can see that a user could run into a problem is if they get the decompressor/compressor from the CodecPool and then don't return it, in which case they are really using CodecPool wrong, which I would hope is not common enough to justify keeping this fix out.

My main motivation is that this makes it possible to use Spark safely with LZO (see #91).

Hope you guys can incorporate it one way or another! Maybe a 0.5.x release (to clearly signal any potential change of behavior)?

…ssor to avoid race conditions

The input/output stream implementations erroneously add the (de)compressors back to the CodecPool on close.
The user who creates the (de)compressor is responsibile for doing this, and if they return a decompressor
as well, you will have the same instance in the pool twice.
@sjlee
Copy link
Collaborator

sjlee commented Dec 3, 2014

Thanks for the PR @themodernlife. Unfortunately, however, I'm afraid that the problem is more complicated.

Most use cases go through LzopCodec to create the input stream. However, LzopCodec itself has two conflicting ways of managing the decompressor instances. For example, one can call createInputStream(InputStream, Decompressor) to obtain the input stream (like LineRecordReader does). In this case, in principle it is the caller that's responsible for returning the decompressor to the pool.

On the other hand, LzopCodec also has createInputStream(InputStream), in which case it is LzopCodec itself that obtains the decompressor (see line 113). In that case, LzopCodec relies on LzopInputStream.close() for returning the decompressor. There is no obvious lifecycle method that you could use to have LzopCodec return the decompressor.

So if we removed the call to return the decompressor within LzopInputStream.close(), we would in fact leak the decompressors for all use cases that go through LzopCodec.createInputStream(InputStream). I know for a fact that there are tons of use cases for that.

@rangadi
Copy link
Contributor

rangadi commented Dec 3, 2014

When LzopCodec creates the decompressor, it could return a filtered inputstream that returns the decompressor when the stream is closed.

That is probably all the that is required.

@sjlee
Copy link
Collaborator

sjlee commented Dec 3, 2014

That sounds like a good approach. @themodernlife, would you like to update your PR to do that, both for the compressor and the decompressor?

@themodernlife
Copy link
Contributor Author

PR updated. Good idea @rangadi!

}

@Override
public int read() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should implement read(byte b[], int off, int len), otherwise, reads will be very slow.
actually even better is to make it extend FilterInputStream and override only close().
Same for OutputStream.

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor : I don't think hadoop code base encourage * import.

@rangadi
Copy link
Contributor

rangadi commented Dec 4, 2014

+1. Thanks for the updates.

@sjlee
Copy link
Collaborator

sjlee commented Dec 5, 2014

LGTM. Thanks @themodernlife for your contribution! I'll merge it shortly.

sjlee added a commit that referenced this pull request Dec 5, 2014
Remove unnecessary calls to CodecPool.returnCompressor/returnDecompresso...
@sjlee sjlee merged commit d62701d into twitter:master Dec 5, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Get corrent result processing files, but errors processing directories?
3 participants